// 重置checkpoint 计数器 long nextCheckpointId = savepoint.getCheckpointID() + 1; checkpointIdCounter.setCount(nextCheckpointId);
LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId); // 从最近一次 Checkpoint 处恢复 State // 获取OperatorState,分配state return restoreLatestCheckpointedStateInternal(new HashSet<>(tasks.values()), true, true, allowNonRestored); }
总结
首先客户端提供 Checkpoint 或 Savepoint 的目录
JM 从给定的目录中找到 _metadata 文件(Checkpoint 的元数据文件)
JM 解析元数据文件,做一些校验,将信息写入到 zk 中,然后准备从这一次 Checkpoint 中恢复任务
JM 拿到所有算子对应的 State,给各个 subtask 分配 StateHandle(状态文件句柄)